package com.stars.easyms.base.asynchronous;

import com.stars.easyms.base.batch.BatchResult;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

import java.util.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * <p>className: BaseAsynchronousTask</p>
 * <p>description: 异步线程批量处理数据的基础类</p>
 *
 * @author guoguifang
 * @date 2019/8/22 16:32
 * @since 1.3.0
 */
public abstract class BaseBatchAsynchronousTask<T> extends AbstractAsynchronousTask<T> {

    private static final long DEFAULT_DELAY_TIME = 100;

    private static final int DEFAULT_BATCH_EXECUTE_MAX_COUNT_PER_TIME = 3000;

    /**
     * 顺序锁
     */
    private final Lock orderedLock = new ReentrantLock();

    /**
     * 延迟提交时间(该时间越长对数据库压力越小，但实时性越低)
     */
    private long delayTime;

    /**
     * 延迟时间(默认100毫秒)
     */
    protected long delayTime() {
        return DEFAULT_DELAY_TIME;
    }

    /**
     * 每次批量处理任务的最大数量(默认3000)
     */
    protected int batchExecuteMaxCountPerTime() {
        return DEFAULT_BATCH_EXECUTE_MAX_COUNT_PER_TIME;
    }

    /**
     * 为每一个批量处理的方法建立一个独立的事务
     */
    @Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = {Exception.class})
    public BatchResult doExecute(List<T> list) throws Exception {
        return execute(list);
    }

    /**
     * 异步处理数据的方法(批量)
     *
     * @param list 批量待处理的数据
     * @return BatchResult 批量处理的结果
     * @throws Exception 异常
     */
    protected abstract BatchResult execute(List<T> list) throws Exception;

    @Override
    void batchInit() {
        this.batch = true;
        this.delayTime = Math.max(this.delayTime(), 0);
        this.maxExecuteCountPerTime = Math.max(this.batchExecuteMaxCountPerTime(), 1);
    }

    @Override
    void handleQueue(boolean isCore) {
        if (isOrder()) {
            orderedHandleQueue(isCore);
        } else {
            nonOrderedHandleQueue(isCore);
        }
    }

    private void orderedHandleQueue(boolean isCore) {
        List<T> list = new ArrayList<>();
        int getCount = 0;
        T t;
        orderedLock.lock();
        try {
            while ((t = dequeue(isCore)) != null) {
                list.add(t);
                if (++getCount >= maxExecuteCountPerTime) {
                    break;
                }
            }
        } finally {
            orderedLock.unlock();
        }
        if (getCount > 0) {
            batchHandleQueue(list);
        }
    }

    private void nonOrderedHandleQueue(boolean isCore) {
        List<T> list = new ArrayList<>();
        int getCount = 0;
        int delayCount = 0;
        T t = dequeue(isCore);
        while (t != null) {
            list.add(t);
            if (++getCount >= maxExecuteCountPerTime) {
                batchHandleQueue(list);
                getCount = 0;
            }
            t = dequeue(isCore);
            if (t == null) {
                if (delayCount == 0 && delayTime > 0) {
                    try {
                        Thread.sleep(delayTime);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                    delayCount++;
                    t = dequeue(isCore);
                } else {
                    break;
                }
            }
        }
        if (getCount > 0) {
            batchHandleQueue(list);
        }
    }

    @SuppressWarnings("unchecked")
    private void batchHandleQueue(List<T> list) {
        try {
            BatchResult batchResult = doExecute(list);
            if (batchResult != null) {
                addErrorDatas(0, batchResult.getFailDatas());
            }
        } catch (Exception e) {
            logger.error("执行异步任务失败!", e);
            addErrorDatas(0, list);
        }
        list.clear();
    }

    @Override
    @SuppressWarnings("unchecked")
    void batchHandleErrorData(List<T> list, int failCount, boolean isCheckSize) {
        if (list.isEmpty()) {
            return;
        }
        if (!isCheckSize || list.size() >= maxExecuteCountPerTime) {
            try {
                BatchResult batchResult = doExecute(list);
                if (batchResult != null) {
                    if (failCount < failRetryCount - 1) {
                        addErrorDatas(failCount + 1, batchResult.getFailDatas());
                    } else {
                        failedDiscardCount.increment();
                    }
                }
            } catch (Exception e) {
                logger.error("执行异步任务失败!", e);
                if (failCount < failRetryCount - 1) {
                    addErrorDatas(failCount + 1, list);
                } else {
                    failedDiscardCount.increment();
                }
            }
            list.clear();
        }
    }

    private void addErrorDatas(int failCount, List<T> list) {
        if (failRetryCount > 0 && list != null && !list.isEmpty()) {
            for (T t : list) {
                addErrorData(failCount, t);
            }
        }
    }

    @Override
    void handleErrorData(T t, int failCount) {
        // Intentionally blank
    }

}
